/**
 * Copyright (C) 2010, 2011 Neofonie GmbH
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package eu.dicodeproject.analysis.twitter;

import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;

import eu.dicodeproject.analysis.hbase.TweetCols;

/**
 *
 */
public class SchemaUpdaterMapper extends TableMapper<NullWritable, NullWritable> {

  /** Column names. */
  private static final byte[] OLD_CREATION_DATE = Bytes.toBytes("creationDate");
  private static final byte[] OLD_FROM = Bytes.toBytes("from");
  private static final byte[] OLD_FROM_ID = Bytes.toBytes("fromId");
  private static final byte[] OLD_GEO = Bytes.toBytes("geo");
  private static final byte[] OLD_IMAGE_URL = Bytes.toBytes("imageUrl");
  private static final byte[] OLD_LANG = Bytes.toBytes("lang");
  private static final byte[] OLD_SOURCE = Bytes.toBytes("source");
  private static final byte[] OLD_TEXT = Bytes.toBytes("text");
  private static final byte[] OLD_TO = Bytes.toBytes("to");

  private byte[] sourceTextFamily;
  private byte[] sourceMetaFamily;
  private byte[] table;
  private byte[] family;

  private final DateFormat dateFormat = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss +SSS");

  private HTablePool hTablePool;

  /*
   * (non-Javadoc)
   * 
   * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object,
   * java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
   */
  @Override
  public void map(ImmutableBytesWritable key, Result tweet, Context context) throws IOException, InterruptedException {
    Put put = new Put(key.get());

    NavigableMap<byte[], NavigableMap<byte[], byte[]>> tweetMap = tweet.getNoVersionMap();
    NavigableMap<byte[], byte[]> textMap = tweetMap.get(this.sourceTextFamily);

    addToPut(put, TweetCols.TEXT.bytes(), textMap.get(OLD_TEXT));
    addToPut(put, TweetCols.FROM_ID.bytes(), textMap.get(OLD_FROM_ID));

    NavigableMap<byte[], byte[]> metaMap = tweetMap.get(this.sourceMetaFamily);

    addToPut(put, TweetCols.FROM.bytes(), metaMap.get(OLD_FROM));
    addToPut(put, TweetCols.SOURCE.bytes(), metaMap.get(OLD_SOURCE));
    addToPut(put, TweetCols.TO.bytes(), metaMap.get(OLD_TO));
    addToPut(put, TweetCols.GEO.bytes(), metaMap.get(OLD_GEO));
    addToPut(put, TweetCols.IMAGE_URL.bytes(), metaMap.get(OLD_IMAGE_URL));
    addToPut(put, TweetCols.LANG.bytes(), metaMap.get(OLD_LANG));

    try {
      // unfortunately we have to parse the date..
      String creationDate = Bytes.toString(metaMap.get(OLD_CREATION_DATE));
      byte[] creationDateUnix = Bytes.toBytes(dateFormat.parse(creationDate).getTime());
      addToPut(put, TweetCols.CREATION_DATE.bytes(), creationDateUnix);
    } catch (ParseException e) {
      context.getCounter("SchemaUpdaterMapper", "ParseException").increment(1l);
    }

    HTableInterface table = this.hTablePool.getTable(this.table);
    table.put(put);
    this.hTablePool.putTable(table);
  }

  private void addToPut(Put put, byte[] col, byte[] value) {
    if (value != null && value.length > 0) {
      put.add(this.family, col, value);
    }
  }

  /*
   * (non-Javadoc)
   * 
   * @see
   * org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper
   * .Context)
   */
  @Override
  public void setup(Context context) throws IOException, InterruptedException {
    Configuration cfg = context.getConfiguration();

    this.sourceTextFamily = cfg.get("sourceTextFamily").getBytes();
    this.sourceMetaFamily = cfg.get("sourceMetaFamily").getBytes();
    this.table = cfg.get("table").getBytes();
    this.family = cfg.get("family").getBytes();

    this.hTablePool = new HTablePool(cfg, 10);

    super.setup(context);
  }
}
